fix(core): make appendcol row ordering deterministic on parallel engines#5474
Conversation
PR Reviewer Guide 🔍(Review updated until commit 41ef5f7)Here are some key observations to aid the review process:
|
appendcol lowers to a FULL JOIN of two ROW_NUMBER() OVER () windows (empty PARTITION BY / ORDER BY) on _row_number_main_ = _row_number_subsearch_, with no trailing sort. That positional zip is only correct on a serial, order-preserving executor: a bare ROW_NUMBER() OVER () assigns sequence numbers in input order and the join preserves it. On a parallel/distributed backend the row-number assignment is arbitrary and the hash join drops ordering, so columns get zipped onto the wrong rows and downstream `head` slices a non-deterministic subset. Fix visitAppendCol to not depend on implicit input-order preservation: - derive an explicit window ORDER BY from each child's collation (deriveCollationOrderKeys), so ROW_NUMBER assignment follows the upstream sort; falls back to the prior bare OVER () when the input has no collation (positional correspondence is undefined without a sort). - add a trailing sort by the row-number columns after the join (NULLS LAST, same pattern as streamstats) so output order is deterministic regardless of how the backend executes the join. No behavior change on the serial v2/Calcite engine; makes the lowering correct on parallel backends. Updates CalcitePPLAppendcolTest expected plans/SparkSQL. Signed-off-by: Kai Huang <ahkcs@amazon.com>
094e05e to
41ef5f7
Compare
|
Persistent review updated to latest commit 41ef5f7 |
| + " `t`.`COMM`, `t`.`DEPTNO`\n" | ||
| + "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`," | ||
| + " ROW_NUMBER() OVER () `_row_number_main_`\n" | ||
| + " ROW_NUMBER() OVER (ORDER BY `EMPNO` NULLS LAST) `_row_number_main_`\n" |
There was a problem hiding this comment.
our helper calls getMetadataQuery().collations(peek()), which for LogicalTableScan(scott.EMP) returns the table's declared [EMPNO ASC NULLS LAST] (SCOTT models EMPNO as the Primary Key), and we thread that into the window's OVER (...), OVER (ORDER BY <known input order>) forces deterministic ROW_NUMBER assignment
What's broken
appendcolpairs the wrong rows together on parallel engines (analytics engine via DataFusion, Spark pushdown), and rows come out in scrambled order.Internally
appendcollowers to:That only works on a single-threaded engine. On a parallel one, row numbers are assigned arbitrarily and the join then glues the wrong subsearch row onto the wrong main row.
Root cause, step by step (using the IT data)
Step 1 — what
appendcoldoes mechanicallytestAppendColquery:Main (after
stats sum(age) by gender, state | sort gender, state) — many rows, one per(gender, state). F rows come first alphabetically, then M rows:Subsearch (
stats count(age) by gender | sort gender) — exactly 2 rows:appendcol(1) numbers each main row, (2) numbers each subsearch row, (3)FULL JOINs on those row numbers.Step 2 — on v2 (serial), row numbers follow input order
Join on
_rn_main_ = _rn_sub_:After
head 10:F/AK, F/AL, F/AR, F/AZ, F/CA, F/CO, F/CT, F/DC, F/DE, F/FL— exactly what the IT asserts.Step 3 — on the analytics route (DataFusion, parallel), row numbers are arbitrary
ROW_NUMBER() OVER ()ignores input order on a parallel engine. Example random assignment:Same
FULL JOIN:This matches the actual IT failure:
M/MD leaked into the top 10 (sort order was lost too), and the
cntvalues landed on the wrong main rows entirely.Step 4 — a downstream sort can't fix this
What if we add
| sort gender, stateat the end to "fix the order"?A sort only reorders tuples; it can't pry one open and swap
cntvalues between rows. The exact tuple[F, AK, 317, 493]the IT asserts doesn't exist anywhere in the broken output — no row anywhere hasF/AKpaired with493. NoverifyDataRowsInAnyOrderor trailing sort can recover that.Step 5 — the fix, applied to the same scenario
The wrong pairing is committed inside
visitAppendCol(Step 3). Once those tuples exist, no downstream PPL command can undo them — so the lowering itself must produce deterministic row numbers. Two changes do that:(a) Fill the window's
OVER (???)with the upstream sort's collation:DataFusion is now obliged to sort by those keys before numbering, so it assigns the same numbers v2 would:
The
FULL JOINon row numbers now pairs correctly:F/AK ↔ F(493),F/AL ↔ M(507), restnull. The tuples are right —[F, AK, 317, 493]and[F, AL, 397, 507]exist in memory.(b) Add a trailing
sort by _rn_main_, _rn_sub_ (NULLS LAST)after the join:DataFusion's hash join can still scramble output order even when the individual tuples are correct. The trailing sort puts them back in row-number order, which (by Piece a) is the upstream sort order:
After
head 10: the same 10 F-rows v2 produced (F/AK … F/FL), with the rightcnton each. The IT now passes 2/2 on the analytics route.Both pieces are required — (a) alone leaves the post-join output scrambled by the hash join; (b) alone sorts by row numbers that are themselves random.
Where the
ROW_NUMBER() OVER (...)is emittedThe window expression is built inside
visitAppendColvia a helperPlanUtils.makeOver:End-to-end path of the offending construct:
Before the fix,
mainOrderKeyswas hard-coded toList.of(), so DataFusion saw bareOVER ()and assigned numbers arbitrarily.Fix
Two changes in
visitAppendCol:???: changemainOrderKeysfrom hard-codedList.of()toderiveCollationOrderKeys(context), which reads the upstream collation viaRelMetadataQuery.collations(peek()). SoOVER ()becomesOVER (ORDER BY <upstream sort>)— deterministic on DataFusion.streamstatsalready uses).No behavior change on v2/Calcite (its serial execution already produced this order).
Results
CalcitePPLAppendcolIT:Testing
CalcitePPLAppendcolTest(5 unit tests, regenerated plans) ✅CalcitePPLAppendcolIT2/2 on both paths ✅NewAddedCommandsIT.testAppendcol✅appendcol.mddoctest ✅ (no doc edits needed)spotlessCheckclean ✅Check List
--signoff.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.